package com.xiaofan.apitest.tabletest

import com.xiaofan.apitest.source.SensorReading
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row

object TimeAndWindowTest {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val tabEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    val inputPath = "D:\\big-data\\code\\FlinkTutorial\\src\\main\\resources\\sensor.txt"
    val inputStream: DataStream[String] = env.readTextFile(inputPath)

    val dataStream: DataStream[SensorReading] = inputStream.map(
      data => {
        val arr: Array[String] = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      }
    ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.milliseconds(3)) {
      override def extractTimestamp(element: SensorReading) = element.timestamp * 1000L
    })

    val sensorTable: Table = tabEnv.fromDataStream(dataStream, $"id", $"temperature", $"timestamp".rowtime() as "ts")

    //    sensorTable.printSchema()
    //    tabEnv.toAppendStream[Row](sensorTable).print()

    // 1. Group Window
    // 1.1. table api
    val resultTable: Table = sensorTable
      //      .window(Tumble.over(10.seconds()).on($"ts").as($"tw")) // 每10秒统计一次，滚动时间窗口
      .window(Tumble over 10.seconds on $"ts" as $"tw")
      .groupBy($"id", $"tw")
      .select($"id", $"id".count, $"temperature".avg, $"tw".end)



    // 1.2. sql
    tabEnv.createTemporaryView("sensor", sensorTable)
    val sqlResultTable: Table = tabEnv.sqlQuery(
      """
        |select
        | id,
        | count(id),
        | avg(temperature),
        | tumble_end(ts, interval '10' second)
        |from sensor
        |group by
        | id,
        | tumble(ts, interval '10' second)
        |""".stripMargin)

    //        timeResultTable.printSchema()


    // 2. Over window: 统计每个sensor每条数据，与之前两行数据的平均温度
    // 2.1. table api
    val overResultTable: Table = sensorTable
      .window(Over partitionBy $"id" orderBy $"ts" preceding 2.rows as $"ow")
      .select("$id", $"ts", $"id".count over $"ow", $"temperature".avg over $"ow")

    // 2.2. sql
    val sqlOverWindowResult: Table = tabEnv.sqlQuery(
      """
        |select
        | id,
        | ts,
        | count(id) over ow,
        | avg(temperature) over ow
        |from sensor
        |window ow as (
        | partition by id
        | order by ts
        | rows between 2 preceding and current row
        |)
        |""".stripMargin)

    tabEnv.toAppendStream[Row](resultTable).print("table")
    tabEnv.toRetractStream[Row](sqlResultTable).print("sql")

    env.execute("time and window test")

  }
}





















